{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Data Wrangler Feature Store Notebook"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Use this notebook to create a feature group and add features to an offline or online\n",
    "feature store using a Data Wrangler .flow file.\n",
    "\n",
    "A single *feature* corresponds to a column in your dataset. A *feature group* is a predefined\n",
    "schema for a collection of features - each feature in the feature group has a specified data\n",
    "type and name. A single *record* in a feature group corresponds to a row in your datataframe.\n",
    "A *feature store* is a collection of feature groups.\n",
    "\n",
    "This notebook uses Amazon SageMaker Feature Store (Feature Store) to create a feature group\n",
    "and ingest data into feature store. To learn more about SageMaker Feature Store, see\n",
    "[Amazon Feature Store Documentation](http://docs.aws.amazon.com/sagemaker/latest/dg/feature-store.html).\n",
    "\n",
    "To create a feature group, you will create the following resources:\n",
    "* A feature definition using a schema, record identifier, and event-time feature name.\n",
    "* An online or offline store configuration.\n",
    "\n",
    "You will use a processing job to process your data at scale and ingest the data into this feature group."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "First, use the following cell to install dependencies."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# SageMaker Python SDK version 2.x is required\n",
    "import sagemaker\n",
    "import subprocess\n",
    "import sys\n",
    "\n",
    "original_version = sagemaker.__version__\n",
    "if sagemaker.__version__ != \"2.20.0\":\n",
    "    subprocess.check_call([sys.executable, \"-m\", \"pip\", \"install\", \"sagemaker==2.20.0\"])\n",
    "    import importlib\n",
    "\n",
    "    importlib.reload(sagemaker)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import uuid\n",
    "import json\n",
    "import time\n",
    "import boto3\n",
    "import sagemaker"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Parameters\n",
    "The following lists configurable parameters that are used throughout this notebook."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# S3 bucket for saving processing job outputs\n",
    "# Feel free to specify a different bucket here if you wish.\n",
    "sess = sagemaker.Session()\n",
    "bucket = sess.default_bucket()\n",
    "prefix = \"data_wrangler_flows\"\n",
    "flow_id = f\"{time.strftime('%d-%H-%M-%S', time.gmtime())}-{str(uuid.uuid4())[:8]}\"\n",
    "flow_name = f\"flow-{flow_id}\"\n",
    "flow_uri = f\"s3://{bucket}/{prefix}/{flow_name}.flow\"\n",
    "\n",
    "flow_file_name = \"bias_and_explainability.flow\"\n",
    "\n",
    "iam_role = sagemaker.get_execution_role()\n",
    "\n",
    "container_uri = \"663277389841.dkr.ecr.us-east-1.amazonaws.com/sagemaker-data-wrangler-container:1.1.1\"\n",
    "\n",
    "# Processing Job Resources Configurations\n",
    "processing_job_name = f\"data-wrangler-feature-store-processing-{flow_id}\"\n",
    "processing_dir = \"/opt/ml/processing\"\n",
    "\n",
    "# URL to use for sagemaker client.\n",
    "# If this is None, boto will automatically construct the appropriate URL to use\n",
    "# when communicating with sagemaker.\n",
    "sagemaker_endpoint_url = None"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Push Flow to S3\n",
    "Use the following cell to upload the Data Wrangler .flow file to Amazon S3 so that\n",
    "it can be used as an input to the processing job."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Load .flow file\n",
    "with open(flow_file_name) as f:\n",
    "    flow = json.load(f)\n",
    "\n",
    "# Upload to S3\n",
    "s3_client = boto3.client(\"s3\")\n",
    "s3_client.upload_file(flow_file_name, bucket, f\"{prefix}/{flow_name}.flow\")\n",
    "\n",
    "print(f\"Data Wrangler Flow notebook uploaded to {flow_uri}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create Feature Group"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "feature_group_name = f\"FG-{flow_name}\"\n",
    "print(f\"Feature Group Name: {feature_group_name}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The following cell maps types between Data Wrangler supported types and Feature Store\n",
    "supported types (`String`, `Fractional`, and `Integral`). The default type is set to `String`.\n",
    "This means that, if a column in your dataset is not a `float` or `long` type,\n",
    "it will default to `String` in your Feature Store."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "datawrangler_FG_type_mapping = {\"float\": \"Fractional\", \"long\": \"Integral\"}\n",
    "\n",
    "# Some schema types in Data Wrangler are not supported by Feature Store.\n",
    "# Feature store supports String, Integral, and Fractional types.\n",
    "# The following will create a default_FG_type set to String for these types.\n",
    "default_FG_type = \"String\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The following is a list of the column names and data types of the final dataset that will be produced\n",
    "when your data flow is used to process your input dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "column_schema = [\n",
    "    {\"name\": \"marketplace\", \"type\": \"string\"},\n",
    "    {\"name\": \"customer_id\", \"type\": \"long\"},\n",
    "    {\"name\": \"review_id\", \"type\": \"string\"},\n",
    "    {\"name\": \"product_id\", \"type\": \"string\"},\n",
    "    {\"name\": \"product_parent\", \"type\": \"long\"},\n",
    "    {\"name\": \"product_title\", \"type\": \"string\"},\n",
    "    {\"name\": \"star_rating\", \"type\": \"long\"},\n",
    "    {\"name\": \"helpful_votes\", \"type\": \"long\"},\n",
    "    {\"name\": \"total_votes\", \"type\": \"long\"},\n",
    "    {\"name\": \"vine\", \"type\": \"string\"},\n",
    "    {\"name\": \"verified_purchase\", \"type\": \"string\"},\n",
    "    {\"name\": \"review_headline\", \"type\": \"string\"},\n",
    "    {\"name\": \"review_body\", \"type\": \"string\"},\n",
    "    {\"name\": \"year\", \"type\": \"object\"},\n",
    "    {\"name\": \"review_date\", \"type\": \"date\"},\n",
    "    {\"name\": \"product_category\", \"type\": \"string\"},\n",
    "]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Select Record identifier and Event time feature name. These are required parameters for feature group\n",
    "creation.\n",
    "* **Record identifier name** is the name of the feature whose value uniquely identiﬁes a Record\n",
    "deﬁned in the feature group's feature definitions.\n",
    "* **Event time feature name** is the name of the EventTime of a Record in FeatureGroup.\n",
    "A EventTime is point in time when a new event occurs that corresponds to the creation or update of a\n",
    "Record in FeatureGroup. All Records in the FeatureGroup must have a corresponding EventTime."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "record_identifier_name = None\n",
    "if record_identifier_name is None:\n",
    "    raise RuntimeError(\"Select a column name as the feature group identifier.\")\n",
    "\n",
    "event_time_feature_name = None\n",
    "if event_time_feature_name is None:\n",
    "    raise RuntimeError(\"Select a column name as the event time feature name.\")\n",
    "\n",
    "# Below you map the schema detected from Data Wrangler to Feature Group Types.\n",
    "feature_definitions = [\n",
    "    {\"FeatureName\": schema[\"name\"], \"FeatureType\": datawrangler_FG_type_mapping.get(schema[\"type\"], default_FG_type)}\n",
    "    for schema in column_schema\n",
    "]\n",
    "print(feature_definitions)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The following are your online and offline store configurations. You enable an online\n",
    "store by setting `EnableOnlineStore` to `True`. The offline store is located in an\n",
    "Amazon S3 bucket in your account. To update the bucket used, update the\n",
    "parameter `bucket` in the second code cell in this notebook."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "sagemaker_client = boto3.client(\"sagemaker\", endpoint_url=sagemaker_endpoint_url)\n",
    "\n",
    "# Online Store Configuration\n",
    "online_store_config = {\"EnableOnlineStore\": True}\n",
    "\n",
    "# Offline Store Configuration\n",
    "s3_uri = \"s3://\" + bucket  # this is the default bucket defined in previous cells\n",
    "offline_store_config = {\"S3StorageConfig\": {\"S3Uri\": s3_uri}}\n",
    "\n",
    "# Create Feature Group\n",
    "create_fg_response = sagemaker_client.create_feature_group(\n",
    "    FeatureGroupName=feature_group_name,\n",
    "    EventTimeFeatureName=event_time_feature_name,\n",
    "    RecordIdentifierFeatureName=record_identifier_name,\n",
    "    FeatureDefinitions=feature_definitions,\n",
    "    OnlineStoreConfig=online_store_config,\n",
    "    OfflineStoreConfig=offline_store_config,\n",
    "    RoleArn=iam_role,\n",
    ")\n",
    "\n",
    "# Describe Feature Group\n",
    "status = sagemaker_client.describe_feature_group(FeatureGroupName=feature_group_name)\n",
    "while status[\"FeatureGroupStatus\"] != \"Created\":\n",
    "    if status[\"FeatureGroupStatus\"] == \"CreateFailed\":\n",
    "        raise RuntimeError(f\"Feature Group Creation Failed: {status}\")\n",
    "    status = sagemaker_client.describe_feature_group(FeatureGroupName=feature_group_name)\n",
    "    print(\"Feature Group Status: \" + status[\"FeatureGroupStatus\"])\n",
    "    time.sleep(3)\n",
    "\n",
    "print(status)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Use the following code cell to define helper functions for creating inputs to\n",
    "a processing job."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def create_flow_notebook_processing_input(base_dir, flow_s3_uri):\n",
    "    return {\n",
    "        \"InputName\": \"flow\",\n",
    "        \"S3Input\": {\n",
    "            \"LocalPath\": f\"{base_dir}/flow\",\n",
    "            \"S3Uri\": flow_s3_uri,\n",
    "            \"S3DataType\": \"S3Prefix\",\n",
    "            \"S3InputMode\": \"File\",\n",
    "        },\n",
    "    }\n",
    "\n",
    "\n",
    "def create_s3_processing_input(base_dir, name, dataset_definition):\n",
    "    return {\n",
    "        \"InputName\": name,\n",
    "        \"S3Input\": {\n",
    "            \"LocalPath\": f\"{base_dir}/{name}\",\n",
    "            \"S3Uri\": dataset_definition[\"s3ExecutionContext\"][\"s3Uri\"],\n",
    "            \"S3DataType\": \"S3Prefix\",\n",
    "            \"S3InputMode\": \"File\",\n",
    "        },\n",
    "    }\n",
    "\n",
    "\n",
    "def create_redshift_processing_input(base_dir, name, dataset_definition):\n",
    "    return {\n",
    "        \"InputName\": name,\n",
    "        \"DatasetDefinition\": {\n",
    "            \"RedshiftDatasetDefinition\": {\n",
    "                \"ClusterId\": dataset_definition[\"clusterIdentifier\"],\n",
    "                \"Database\": dataset_definition[\"database\"],\n",
    "                \"DbUser\": dataset_definition[\"dbUser\"],\n",
    "                \"QueryString\": dataset_definition[\"queryString\"],\n",
    "                \"ClusterRoleArn\": dataset_definition[\"unloadIamRole\"],\n",
    "                \"OutputS3Uri\": f'{dataset_definition[\"s3OutputLocation\"]}{name}/',\n",
    "                \"OutputFormat\": dataset_definition[\"outputFormat\"].upper(),\n",
    "            },\n",
    "            \"LocalPath\": f\"{base_dir}/{name}\",\n",
    "        },\n",
    "    }\n",
    "\n",
    "\n",
    "def create_athena_processing_input(base_dir, name, dataset_definition):\n",
    "    return {\n",
    "        \"InputName\": name,\n",
    "        \"DatasetDefinition\": {\n",
    "            \"AthenaDatasetDefinition\": {\n",
    "                \"Catalog\": dataset_definition[\"catalogName\"],\n",
    "                \"Database\": dataset_definition[\"databaseName\"],\n",
    "                \"QueryString\": dataset_definition[\"queryString\"],\n",
    "                \"OutputS3Uri\": f'{dataset_definition[\"s3OutputLocation\"]}{name}/',\n",
    "                \"OutputFormat\": dataset_definition[\"outputFormat\"].upper(),\n",
    "            },\n",
    "            \"LocalPath\": f\"{base_dir}/{name}\",\n",
    "        },\n",
    "    }\n",
    "\n",
    "\n",
    "def create_processing_inputs(processing_dir, flow, flow_uri):\n",
    "    \"\"\"Helper function for creating processing inputs\n",
    "    :param flow: loaded data wrangler flow notebook\n",
    "    :param flow_uri: S3 URI of the data wrangler flow notebook\n",
    "    \"\"\"\n",
    "    processing_inputs = []\n",
    "    flow_processing_input = create_flow_notebook_processing_input(processing_dir, flow_uri)\n",
    "    processing_inputs.append(flow_processing_input)\n",
    "\n",
    "    for node in flow[\"nodes\"]:\n",
    "        if \"dataset_definition\" in node[\"parameters\"]:\n",
    "            data_def = node[\"parameters\"][\"dataset_definition\"]\n",
    "            name = data_def[\"name\"]\n",
    "            source_type = data_def[\"datasetSourceType\"]\n",
    "\n",
    "            if source_type == \"S3\":\n",
    "                s3_processing_input = create_s3_processing_input(processing_dir, name, data_def)\n",
    "                processing_inputs.append(s3_processing_input)\n",
    "            elif source_type == \"Athena\":\n",
    "                athena_processing_input = create_athena_processing_input(processing_dir, name, data_def)\n",
    "                processing_inputs.append(athena_processing_input)\n",
    "            elif source_type == \"Redshift\":\n",
    "                redshift_processing_input = create_redshift_processing_input(processing_dir, name, data_def)\n",
    "                processing_inputs.append(redshift_processing_input)\n",
    "            else:\n",
    "                raise ValueError(f\"{source_type} is not supported for Data Wrangler Processing.\")\n",
    "    return processing_inputs"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Start ProcessingJob\n",
    "Now, the Processing Job is submitted to a boto client. The status of the processing job is\n",
    "monitored with the boto client, and this notebook waits until the job is no longer 'InProgress'."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Processing job name\n",
    "print(f\"Processing Job Name: {processing_job_name}\")\n",
    "\n",
    "processingResources = {\"ClusterConfig\": {\"InstanceCount\": 1, \"InstanceType\": \"ml.m5.4xlarge\", \"VolumeSizeInGB\": 30}}\n",
    "\n",
    "appSpecification = {\"ImageUri\": container_uri}\n",
    "\n",
    "sagemaker_client.create_processing_job(\n",
    "    ProcessingInputs=create_processing_inputs(processing_dir, flow, flow_uri),\n",
    "    ProcessingOutputConfig={\n",
    "        \"Outputs\": [\n",
    "            {\n",
    "                \"OutputName\": \"14039109-2da9-49b4-8eee-df39306c9c47.default\",\n",
    "                \"FeatureStoreOutput\": {\"FeatureGroupName\": feature_group_name},\n",
    "                \"AppManaged\": True,\n",
    "            }\n",
    "        ],\n",
    "    },\n",
    "    ProcessingJobName=processing_job_name,\n",
    "    ProcessingResources=processingResources,\n",
    "    AppSpecification=appSpecification,\n",
    "    RoleArn=iam_role,\n",
    ")\n",
    "\n",
    "\n",
    "status = sagemaker_client.describe_processing_job(ProcessingJobName=processing_job_name)\n",
    "\n",
    "while status[\"ProcessingJobStatus\"] in (\"InProgress\", \"Failed\"):\n",
    "    if status[\"ProcessingJobStatus\"] == \"Failed\":\n",
    "        raise RuntimeError(f\"Processing Job failed: {status}\")\n",
    "    status = sagemaker_client.describe_processing_job(ProcessingJobName=processing_job_name)\n",
    "    print(status[\"ProcessingJobStatus\"])\n",
    "    time.sleep(60)\n",
    "\n",
    "print(status)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Cleanup\n",
    "Uncomment the following code cell to revert the SageMaker Python SDK to the original version used\n",
    "before running this notebook. This notebook upgrades the SageMaker Python SDK to 2.x, which may\n",
    "cause other example notebooks to break. To learn more about the changes introduced in the\n",
    "SageMaker Python SDK 2.x update, see\n",
    "[Use Version 2.x of the SageMaker Python SDK.](https://sagemaker.readthedocs.io/en/stable/v2.html)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# _ = subprocess.check_call(\n",
    "#     [sys.executable, \"-m\", \"pip\", \"install\", f\"sagemaker=={original_version}\"]\n",
    "# )"
   ]
  }
 ],
 "metadata": {
  "instance_type": "ml.t3.medium",
  "kernelspec": {
   "display_name": "Python 3 (Data Science)",
   "language": "python",
   "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
